Luis Muniz
06/26/2020, 9:54 AMLuis Muniz
06/26/2020, 11:35 AMprefect agent start
This results in an error complaining about missing an API token.
prefect.utilities.exceptions.AuthorizationError: No agent API token provided.
When I then call
prefect auth create-token -n calypso -s RUNNER
I see the following error:
Traceback (most recent call last):
File "/home/lmuniz/.local/bin/prefect", line 8, in <module>
sys.exit(cli())
File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/home/lmuniz/.local/lib/python3.8/site-packages/prefect/cli/auth.py", line 201, in create_token
output = client.graphql(
File "/home/lmuniz/.local/lib/python3.8/site-packages/prefect/client/client.py", line 213, in graphql
result = <http://self.post|self.post>(
File "/home/lmuniz/.local/lib/python3.8/site-packages/prefect/client/client.py", line 172, in post
response = self._request(
File "/home/lmuniz/.local/lib/python3.8/site-packages/prefect/client/client.py", line 334, in _request
json_resp = response.json()
File "/usr/lib/python3/dist-packages/requests/models.py", line 897, in json
return complexjson.loads(self.text, **kwargs)
File "/usr/lib/python3/dist-packages/simplejson/__init__.py", line 518, in loads
return _default_decoder.decode(s)
File "/usr/lib/python3/dist-packages/simplejson/decoder.py", line 370, in decode
obj, end = self.raw_decode(s)
File "/usr/lib/python3/dist-packages/simplejson/decoder.py", line 400, in raw_decode
return self.scan_once(s, idx=_w(s, idx).end())
simplejson.errors.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
I have started prefect with the recommended prefect server start
command and installed prefect with pip3.
$prefect diagnostics
{
"config_overrides": {},
"env_vars": [],
"system_information": {
"platform": "Linux-5.4.0-7624-generic-x86_64-with-glibc2.29",
"prefect_version": "0.12.1",
"python_version": "3.8.2"
}
}
Luis Muniz
06/26/2020, 12:21 PMHamzah Iqbal
06/26/2020, 1:44 PMAlex Cano
06/26/2020, 2:06 PMLuis Muniz
06/26/2020, 7:17 PMmap()
? is using map()
to handle a list of thousands (or millions?) of elements one by one reasonable? Should I micro-batch it into chunks? Does prefect start to choke if you have many many tasks in a flow? What about the Dashboard when you examine such a flow after it has run?nicholas
06/26/2020, 8:04 PMcurrently live on YouTube▾
matta
06/27/2020, 12:20 AMHui Zheng
06/27/2020, 1:24 AMFile "build_and_deploy.py", line 48, in <module>
env_vars=deployment_vars
File "/Users/huizheng/.local/share/virtualenvs/data-platform-W017i1KI/lib/python3.7/site-packages/prefect/core/flow.py", line 1277, in deploy
version_group_id=version_group_id,
File "/Users/huizheng/.local/share/virtualenvs/data-platform-W017i1KI/lib/python3.7/site-packages/prefect/client/client.py", line 577, in deploy
versionGroupId=version_group_id,
File "/Users/huizheng/.local/share/virtualenvs/data-platform-W017i1KI/lib/python3.7/site-packages/prefect/client/client.py", line 222, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['createFlowFromCompressedString'], 'message': '1 validation error for FlowSchema\nschedule -> clocks -> 0 -> parameter_defaults\n field required (type=value_error.missing)', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
I am using prefect version 0.7.3
(yes, it’s old, but have to stay with it for another month). the error was thrown at this line of code
dbt_flow.deploy(
'{}'.format(env),
python_dependencies=[
'google-cloud-firestore', 'python-dotenv', 'google-cloud-storage', 'environs'],
files={
path.abspath('./prefect_cloud_deployment/{}/gcp-scheduler-key.json'.format(env)): '/home',
},
registry_url='<http://us.gcr.io/semios-dbt/scheduler|us.gcr.io/semios-dbt/scheduler>',
env_vars=deployment_vars
The code before that line which might related to this error
# Flow auto-schedule
start_time = datetime.now()
start_time = start_time.replace(hour=(start_time.hour), minute=10, second=0, microsecond=0)
hourly_schedule = Schedule(clocks=[IntervalClock(interval=timedelta(minutes=60), start_date=start_time)])
dbt_flow.schedule = hourly_schedule
Alfie
06/27/2020, 2:31 AMJorge
06/27/2020, 9:31 AMSandeep Aggarwal
06/27/2020, 12:41 PMPREFECT__LOGGING__LOG_ATTRIBUTES
?
I am trying to send a unique id while creating a flow run using python client. I want this uuid to be available in logs for tracking the request flow. However, so far I haven't had any success in correctly setting the extra log attributes.
This is how i start the docker agent:
prefect agent start docker -e PREFECT__LOGGING__LOG_ATTRIBUTES="['uuid']"
I tried debugging and found that the log attributes are fetched as string here, instead of list.Cab Maddux
06/27/2020, 6:37 PMRafal
06/28/2020, 2:58 PMitay livni
06/28/2020, 3:13 PMLuis Muniz
06/28/2020, 10:40 PMfrom tasks.collect.games import *
from tasks.collect.streamers import *
from tasks.collect.streams import *
from tasks.enrich.game import *
from tasks.enrich.streamer import *
from tasks.enrich.stream import *
from tasks.store.game import *
from tasks.store.streamer import *
from tasks.store.stream import *
from tasks.util.common import *
with Flow("STRDATA POC") as strdata:
collected_games = collect_games()
enriched_games = enrich_game.map(collected_games)
collected_streamers = collect_streamers()
enriched_streamers = enrich_streamer.map(collected_streamers)
collected_streams = collect_streams_per_game.map(enriched_games, unmapped(enriched_streamers))
enriched_streams = enrich_stream.map(flatten(collected_streams))
store_game.map(enriched_games)
store_stream.map(enriched_streams)
store_streamer.map(enriched_streamers)
The Flow runs OK when I run it standalone, but when I register it in my local prefect server, I can see the following error in the dashboard:
Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'tasks'")
it seems to be similar to an issue I found about not being able to submit flows to prefect cloud because of some peculiarity with pickle?
https://github.com/PrefectHQ/prefect/issues/1742
But this was related to packaging the flow in a docker image, so I can't apply the solution to my case
The layout of my project is the following:
deploy
|_
prod
|_
register.py (contains flow.register)
flows
|_
strdata_poc.py (contains flow definition - see above)
tasks
|_
collect
|_
games.py
streamers.py
streams.py
enrich
|_
...
Hui Zheng
06/29/2020, 12:27 AM[libprotobuf ERROR google/protobuf/descriptor_database.cc:58] File already exists in database:
[libprotobuf FATAL google/protobuf/descriptor.cc:1370] CHECK failed: GeneratedDatabase()->Add(encoded_file_descriptor, size):
libc++abi.dylib: terminating with uncaught exception of type google::protobuf::FatalException: CHECK failed: GeneratedDatabase()->Add(encoded_file_descriptor, size):
Abort trap: 6
The error is thrown when the code try to import from main import dbt_flow
from .py file that defines the prefect flow. below are some libraries imported in the main.py file
from google.api_core.datetime_helpers import DatetimeWithNanoseconds
from prefect import task, Flow, Parameter, triggers, unmapped
from prefect.engine import signals, result_handlers
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime, MAXYEAR
from google.cloud import firestore
from os import walk, path, getenv
from environs import Env
from collections.abc import Iterable
from itertools import chain
from copy import deepcopy
from time import sleep
import requests
import argparse
import json
import pytz
here is the pipfile
[packages]
prefect = "==0.7.3"
google-cloud-firestore = "==1.5.0"
python-dotenv = "==0.10.3"
environs = "==7.3.0"
[requires]
python_version = "3.7"
Chris Vrooman
06/29/2020, 3:10 AMjars
06/29/2020, 3:44 AMAlfie
06/29/2020, 10:58 AMDarragh
06/29/2020, 11:04 AMUnexpected error: KilledWorker('run_task-a6bb0ba3-e74f-4e26-8cc4-2bcc8f8c217c', <Worker '<tcp://127.0.0.1:41917>', name: 1, memory: 0, processing: 1>)
Doesn’t seem to matter what the flow is, getting this on all flows, even after restarting Prefect Server and the Fargate Agent..Etienne
06/29/2020, 11:07 AMflow_group
graphql query available? running 0.12.1Kevin Weiler
06/29/2020, 6:43 PMprefect.config.api.url
(env variable PREFECT__CONFIG__API__URL
and setting it in config.toml
but when I call .register()
I get the following error:
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fbcbd594e10>: Failed to establish a new connection: [Errno 111] Connection refused'))
indicating that it’s still trying localhost
instead of what I’ve given it (which does not resolve to localohost
) - any ideas?Philip MacMenamin
06/29/2020, 7:01 PMParameter
in ShellTask
According to the docs, Parameter
is a type of Task
, and I'm guessing it needs to get the method run
called on it before it's usable. I'm guessing that this method is called when you pass the parameter to a function with the @task
decorator?
I have a shell task, which I'd like to pass an object (or a string) which is created using a Parameter
object. Normally Shell tasks seem to be defined in the flow block. When I try to run a shell task, in a flow, with arguments that are dependent on a parameter, it fails with something like:
Traceback (most recent call last):
File "/usr/lib/python3.8/runpy.py", line 193, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.8/runpy.py", line 86, in _run_code
exec(code, run_globals)
File "/home/macmenaminpe/code/prefect/pdb_flow/parameterized_flow.py", line 45, in <module>
num_lines = s_task(command=f"wc -l {job.pdb_fp} > {job.job_dir}num_lines.txt")
File "/home/macmenaminpe/.local/lib/python3.8/site-packages/prefect/tasks/core/function.py", line 68, in __getattr__
raise AttributeError(f"'FunctionTask' object has no attribute {k}")
AttributeError: 'FunctionTask' object has no attribute pdb_fp
So, I guess the two questions are:
• how do I "get at" the input val of a Parameter in a flow (or do I not do this, and always pass that param out to a @task
and let it get handled there
• how do I create a shellTask that takes argsJeff Brainerd
06/29/2020, 7:38 PMRetrying
State object? Couldn’t find any doc examples of this and wanted to know if I missed anything obvious before spelunking. Thanks! 🙏Zach
06/29/2020, 8:47 PMmax_retries=3
to.Kevin Weiler
06/29/2020, 9:42 PMTypeError: Object of type datetime is not JSON serializable
the relevant code is, outside the flow context:
@task(result=PrefectResult())
def assert_db_trigger(event_name: str, events_fetcher: DailyFlowEvents, point_date: datetime):
events = events_fetcher.fetch_events()
while event_name not in events.keys() or events[event_name] < point_date:
print(f"No trigger, yet for {event_name}")
sleep(5)
events = events_fetcher.fetch_events()
return point_date
and
point_datetime = datetime(year=point_date.year, month=point_date.month, day=point_date.day, hour=0, minute=0, second=0, microsecond=0, tzinfo=pytz.UTC)
and within the flow context:
task_wait_for_reference_data_validated = assert_db_trigger("reference_data_validated", daily_flow_events, point_datetime)
do all function/method arguments need to be JSON serializable?Klemen Strojan
06/30/2020, 5:35 AMprefect register -f flows/my_flow.py
. I got an error Error: no such option: -f
. When I register the flow with flow.register()
it becomes available in UI but I am unable to run it, the tasks are pending and I can see this error Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'github'")
. I am using 0.12.1
and Cloud. Any ideas?Jacob Blanco
06/30/2020, 5:51 AM@task
def generate_dates(start_date, n_days):
return [start_date - datetime.timedelta(days=x) for x in range(n_days)]
@task
def do_something(updated_at):
## DO SOME STUFF WITH THE DATE
with Flow("My Flow") as flow:
start_date = Parameter('start_date')
n_days = Parameter('n_days')
dates = generate_dates(start_date, n_days)
for run_date in dates:
do_something(run_date)
flow.run(dict(start_date=, n_days= 10))
In this case I don't want to use map for some technical reasons. I could just implement the loop inside of the task but I like having all the timing tracked by Prefect Cloud.Kai Weber
06/30/2020, 7:22 AMKai Weber
06/30/2020, 7:22 AMjosh
06/30/2020, 11:40 AMserver start
command uses
https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/docker-compose.yml
then add the container names and run docker-compose up
to start the server.Kai Weber
06/30/2020, 11:52 AM